package httpProxy

import (
	"encoding/json"
	"github.com/rs/xid"
	"github.com/wumingyu12/wensMqtt/common"
	"github.com/wumingyu12/wensMqtt/model"
	"log"
	"strings"
	"time"
)

type HttpMqttProxy struct {
	base *model.BaseLink
}

func newonfig() model.Config {
	//var myrand = func() string {
	//	r := rand.New(rand.NewSource(time.Now().UnixNano()))
	//	return strconv.Itoa(r.Intn(100000))
	//}
	conf := model.Config{
		//PassWord:       "12345678",
		//UserName:       "resetfulapi",
		//ClientId:       "resetfulapi-" + myrand(),
		//注意这个如果clientid只能同时让一个进行连接如果不行可以换一个
		//PassWord:       "htymqttbridge",
		//UserName:       "htymqttbridge",
		//ClientId:       "htymqttbridge",
		//BrokerUrl:      "tcp://wmqtt.wens.com.cn:1883",
		PassWord:       "wens@wlw",
		UserName:       "wlw",
		ClientId:       "htymqttbridge",
		BrokerUrl:      "tcp://127.0.0.1:1883",
		IsCleanSession: true, //不需要清session我们发现emqtt服务器会自动清如果隔很久还没连接上了，就算你将这个值设为false
	}
	return conf
}
func (self *HttpMqttProxy) SetUp() {
	self.base = new(model.BaseLink)
	self.base.SetUp(newonfig())
	self.base.StartLinkWithKeepalive(func() {
		//self.setupHttpBridge()
		//self.setupTcpBridge()
	})
}

//类似于向clientId为xxx的节点请求http api
func (self *HttpMqttProxy) Proxy(clientId string, req *common.ApiRequset) (res []byte, err error) {
	//向api频道发送，先随机出一个sessionid
	sessionId := xid.New().String()
	apiReqTopic := strings.Replace(common.Const_apisub, "{{clientId}}", clientId, -1) //发送频道
	apiReqTopic = strings.Replace(apiReqTopic, "+", sessionId, -1)
	apiResTopic := apiReqTopic + common.Const_resptail //回复频道
	js, _ := json.Marshal(req)
	waitCh := make(chan bool, 1)
	err = self.base.MqttPub(apiReqTopic, js)
	if err != nil {
		return res, err
	}
	//订阅回复频道
	self.base.MqttSub(apiResTopic, func(topic string, msg []byte) {
		res = msg
		waitCh <- true
	})
	select {
	case <-time.After(8 * time.Second):
		res = []byte("httpProxy 超时")
		return
	case <-waitCh:
		log.Println("mqtt 有回复")
	}
	//取消订阅的频道
	err = self.base.UnSub(apiResTopic)
	if err != nil {
		return
	}
	return
}
